import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class JoinwithTemporalTable_proctime
{

    public static void main(String[] args) throws Exception
    {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings sett = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, sett);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1L);
        //设定watermark的间隔

//--------------------------------------下面是rates数据------------------------------------------------------------------

        List<Tuple2<String, Long>> ratesHistoryData = new ArrayList<>();
        ratesHistoryData.add(Tuple2.of("US Dollar", 102L));
        ratesHistoryData.add(Tuple2.of("Euro", 114L));
        ratesHistoryData.add(Tuple2.of("Yen", 1L));
        ratesHistoryData.add(Tuple2.of("Euro", 116L));
        ratesHistoryData.add(Tuple2.of("Euro", 119L));
        DataStream<Tuple2<String, Long>> ratesHistoryStream =env.fromCollection(ratesHistoryData);
        Table ratesHistory = tableEnv.fromDataStream(ratesHistoryStream,
                $("r_currency"), $("r_rate"), $("r_proctime").proctime());

        TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
                "r_proctime",
                "r_currency");
        tableEnv.registerFunction("rates", rates);

//       --------------------------------下面是orders数据-----------------------------------------------------------------------
        DataStream<Tuple2<String, Long>> orderA = env.fromCollection(Arrays.asList(
                Tuple2.of("US Dollar", 102L),
                Tuple2.of("US Dollar", 102L),
                Tuple2.of("US Dollar", 102L)));

        tableEnv.createTemporaryView("Orders", orderA, $("o_currency"), $("o_ratet"), $("o_proctime").proctime());

//       -------------------------------------------------------------------------------------------------------


        Table result = tableEnv.from("Orders")
                .joinLateral(
                        call("rates",
                                $("o_proctime")),
                        $("o_currency").isEqual($("r_currency")));

        tableEnv.toRetractStream(result, Row.class).print();

        env.execute("");


    }

}

/*

Reference:
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/TemporalTableFunctionJoinTest.scala
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala

*/